package com.tanhua.spark.mongo;

import com.mongodb.spark.MongoSpark;
import com.mongodb.spark.rdd.api.java.JavaMongoRDD;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
import org.apache.spark.mllib.recommendation.Rating;
import org.bson.Document;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import scala.Tuple2;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class SparkVideo {

    private static String MONGODB_HOST = System.getenv("MONGODB_HOST");
    private static String MONGODB_PORT = System.getenv("MONGODB_PORT");
    private static String MONGODB_USERNAME = System.getenv("MONGODB_USERNAME");
    private static String MONGODB_PASSWORD = System.getenv("MONGODB_PASSWORD");
    private final static String MONGODB_DATABASE = System.getenv("MONGODB_DATABASE") == null ? "tanhua" : System.getenv("MONGODB_DATABASE");
    private final static String MONGODB_COLLECTION = System.getenv("MONGODB_COLLECTION") == null ? "recommend_video" : System.getenv("MONGODB_COLLECTION");

    private static String REDIS_NODES = System.getenv("REDIS_NODES");

    private final static Integer SCHEDULE_PERIOD = System.getenv("SCHEDULE_PERIOD") == null ? 10 : Integer.valueOf(System.getenv("SCHEDULE_PERIOD"));

    static {
        //加载外部的配置文件，app-video.properties
        try {
            InputStream inputStream = SparkQunaZi.class.getClassLoader().getResourceAsStream("app-video.properties");
            Properties properties = new Properties();
            properties.load(inputStream);

            MONGODB_HOST = MONGODB_HOST == null ? properties.getProperty("mongodb.host") : MONGODB_HOST;
            MONGODB_PORT = MONGODB_PORT == null ? properties.getProperty("mongodb.port") : MONGODB_PORT;
            MONGODB_USERNAME = MONGODB_USERNAME == null ? properties.getProperty("mongodb.username") : MONGODB_USERNAME;
            MONGODB_PASSWORD = MONGODB_PASSWORD == null ? properties.getProperty("mongodb.password") : MONGODB_PASSWORD;
            REDIS_NODES = REDIS_NODES == null ? properties.getProperty("redis.cluster.nodes") : REDIS_NODES;
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        Runnable runnable = () -> {
            try {
                execute();
            } catch (Exception e) {
                e.printStackTrace();
            }
        };
        ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();

        // 第二个参数为首次执行的延时时间，第三个参数为定时执行的间隔时间
        service.scheduleAtFixedRate(runnable, 0, SCHEDULE_PERIOD, TimeUnit.MINUTES);
    }

    public static void execute() throws Exception {

        //构建Spark配置
        SparkConf sparkConf = new SparkConf()
                .setAppName("SparkVideo")
                .setMaster("local[*]")
                .set("spark.driver.host", "localhost")
                .set("spark.mongodb.input.uri", "mongodb://" + MONGODB_USERNAME + ":" + MONGODB_PASSWORD + "@" + MONGODB_HOST + ":" + MONGODB_PORT + "/admin?readPreference=primaryPreferred")
                .set("spark.mongodb.input.database",MONGODB_DATABASE)
                .set("spark.mongodb.input.collection",MONGODB_COLLECTION);

        //构建Spark上下文
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);

        //加载MongoDB中的数据
        JavaMongoRDD<Document> rdd = MongoSpark.load(jsc);

//        rdd.foreach(document -> System.out.println(document.toJson()));

        //在数据中有同一个用户对不同的小视频进行评价，需要进行合并操作
        JavaRDD<Document> values = rdd.mapToPair(document -> {
            Integer user = document.getLong("userId").intValue();
            Integer product = document.getLong("videoId").intValue();
            return new Tuple2<>(user + "_" + product, document);
        }).reduceByKey((v1, v2) -> {
            Double score = v1.getDouble("score") + v2.getDouble("score");
            v1.put("score", score);
            return v1;
        }).values();

        //得到数据中的用户id集合
        List<Long> userIdList = rdd.map(v1 -> v1.getLong("userId")).distinct().collect();

//        values.foreach(document -> System.out.println(document.toJson()));

        //按照日期对10进行取模作为key，Rating对象作为value，获取到数据用于后续的数据处理
        JavaPairRDD<Long, Rating> ratings = values.mapToPair(document -> {
            Integer user = document.getLong("userId").intValue();
            Integer product = document.getLong("videoId").intValue();
            Double score = document.getDouble("score");
            Long date = document.getLong("date");
            Rating rating = new Rating(user, product, score);
            return new Tuple2<>(date % 10, rating);
        });

        //通过MLlib模型进行推荐，获取到最优的推荐模型
        MLlibRecommend mLlibRecommend = new MLlibRecommend();
        MatrixFactorizationModel bestModel = mLlibRecommend.bestModel(ratings);

        //构建Redis环境
        String[] redisNodes = REDIS_NODES.split(",");
        Set<HostAndPort> nodes = new HashSet<>();
        for (String redisNode : redisNodes) {
            String[] hostAndPorts = redisNode.split(":");
            nodes.add(new HostAndPort(hostAndPorts[0], Integer.valueOf(hostAndPorts[1])));
        }
        saveToRedis(nodes, userIdList, bestModel);

        jsc.close();
    }

    private static void saveToRedis(Set<HostAndPort> nodes, List<Long> userIdList, MatrixFactorizationModel bestModel) {
        JedisCluster jedisCluster = null;
        Jedis jedis = null;
        if (nodes.size() == 1) {
            //单节点
            HostAndPort hostAndPort = nodes.iterator().next();
            jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort());
        } else {
            jedisCluster = new JedisCluster(nodes);
        }


        for (Long userId : userIdList) {
            Rating[] recommendProducts = bestModel.recommendProducts(userId.intValue(), 20);

            List<Integer> products = new ArrayList<>();

            for (Rating product : recommendProducts) {
                products.add(product.product());
            }

            String key = "QUANZI_VIDEO_RECOMMEND_" + userId;
            String value = StringUtils.join(products, ',');

            if (null != jedis) {
                jedis.set(key, value);
            } else {
                jedisCluster.set(key, value);
            }

//            try {
//                FileUtils.write(new File("F:\\data.txt" ),"SET " + key +" " + value+"\n", true);
//            } catch (IOException e) {
//                e.printStackTrace();
//            }
        }

        if (null != jedis) {
            jedis.close();
        } else {
            jedisCluster.close();
        }
    }
}
